# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys, os, gzip, bz2, urllib.request, urllib.error, urllib.parse, re, warnings, editdistance
from hysop import vprint
from hysop.tools.numpywrappers import npw
from hysop.tools.warning import HysopWarning
from hysop.tools.htypes import first_not_None
def _fmt_name(name):
"""Format vendor and device names for reversed lookup."""
if name is None:
return None
else:
assert isinstance(name, str)
return name.strip().lower()
[docs]
class PciVendor:
_regexp = re.compile(r"([0-9a-f]{4})\s+(.*)")
def __init__(self, vendor):
"""
Class initializes with the raw line from pci.ids
"""
vendor = vendor.strip()
match = re.match(self._regexp, vendor)
if not match:
msg = "PCI vendor could not match regexp: {}."
msg = msg.format(vendor)
raise ValueError(msg)
else:
self.sid = match.group(1)
self.id = int(self.sid, 16)
self.name = match.group(2)
self.devices = {}
self.device_names = []
[docs]
def find_device(self, device_id, subdevice_id=None):
if (device_id is None) or (device_id not in self.devices):
return None
device = self.devices[device_id]
if (subdevice_id is None) or (subdevice_id not in device.subdevices):
return device
subdevice = device.subdevices[subdevice_id]
return subdevice
[docs]
def find_device_by_name(self, device_name, subdevice_name=None):
device_name = _fmt_name(device_name)
subdevice_name = _fmt_name(subdevice_name)
if (device_name is None) or (device_name not in self.devices):
return None
device = self.devices[device_name]
if (subdevice_name is None) or (subdevice_name not in device.subdevices):
return device
subdevice = device.subdevices[subdevice_name]
return subdevice
[docs]
def find_device_by_distance_to_name(self, device_name):
device_name = _fmt_name(device_name)
idx = npw.argmin(
npw.fromiter(
(editdistance.eval(vn, device_name) for vn in self.device_names),
dtype=npw.int16,
)
)
if isinstance(idx, npw.int64):
device_name = self.device_names[idx]
else:
device_name = self.device_names[idx[0]]
return self.devices[device_name]
def _parse_device(self, device):
"""
Parse a device and adds it to self.devices
Input is a raw line from pci.ids
Returns the parsed device_id.
"""
device = PciDevice(device, vendor=self)
device_id = device.id
device_sid = device.sid
device_name = _fmt_name(device.name)
if device_id not in self.devices:
self.devices[device_id] = device
self.devices[device_sid] = device
self.devices[device_name] = device
self.device_names.append(device_name)
return device_id
def __str__(self):
return self.name
[docs]
class PciDevice:
_regexp = re.compile(r"([0-9a-f]{4})\s+(.*)")
def __init__(self, device, vendor):
device = device.strip()
match = re.match(self._regexp, device)
if not match:
msg = "pci device could not match regexp: {}."
msg = msg.format(device)
raise valueerror(msg)
self.vendor = vendor
self.sid = match.group(1)
self.id = int(self.sid, 16)
self.name = match.group(2)
self.subdevices = {}
def __str__(self):
return self.name
def _parse_sub_device(self, subdevice):
"""
parse a subdevice and adds it to self.subdevices.
input is a raw line from pci.ids
returns the parsed subdevice_id.
"""
subdevice = PciSubDevice(subdevice, device=self, vendor=self.vendor)
subdevice_id = subdevice.subdevice_id
subdevice_sid = subdevice.subdevice_sid
if subdevice_id not in self.subdevices:
self.subdevices[subdevice_id] = subdevice
self.subdevices[subdevice_sid] = subdevice
return subdevice_id
def __str__(self):
return self.name
[docs]
class PciSubDevice:
_regexp = re.compile(r"([0-9a-f]{4})\s+([0-9a-f]{4})\s+(.*)")
def __init__(self, subdevice, device, vendor):
subdevice = subdevice.strip()
match = re.match(self._regexp, subdevice)
if not match:
msg = "PCI device could not match regexp: {}."
msg = msg.format(subdevice)
raise ValueError(msg)
self.vendor = vendor
self.device = device
self.subvendor_sid = match.group(1)
self.subvendor_id = int(self.subvendor_sid, 16)
self.subdevice_sid = match.group(2)
self.subdevice_id = int(self.subdevice_sid, 16)
self.name = match.group(3)
def __str__(self):
return self.name
[docs]
class PciDeviceClass:
_regexp = re.compile(r"C\s+([0-9a-f]{2})\s+(.*)")
def __init__(self, device_class):
device_class = device_class.strip()
match = re.match(self._regexp, device_class)
if not match:
msg = "Pci device class could not match regexp: {}."
msg = msg.format(device_class)
raise ValueError(msg)
self.sid = match.group(1)
self.id = int(self.sid, 16)
self.name = match.group(2)
self.device_subclasses = {}
def _parse_sub_class(self, subclass):
"""
parse a subdevice and adds it to self.subdevices.
input is a raw line from pci.ids
returns the parsed subdevice_id.
"""
subclass = PciDeviceSubClass(subclass, device_class=self)
subclass_id = subclass.id
subclass_sid = subclass.sid
if subclass_id not in self.device_subclasses:
self.device_subclasses[subclass_id] = subclass
self.device_subclasses[subclass_sid] = subclass
return subclass_id
def __str__(self):
return self.name
[docs]
class PciDeviceSubClass:
_regexp = re.compile(r"([0-9a-f]{2})\s+(.*)")
def __init__(self, device_subclass, device_class):
device_subclass = device_subclass.strip()
match = re.match(self._regexp, device_subclass)
if not match:
msg = "Pci device subclass could not match regexp: {}."
msg = msg.format(device_subclass)
raise ValueError(msg)
self.device_class = device_class
self.sid = match.group(1)
self.id = int(self.sid, 16)
self.name = match.group(2)
self.programming_interfaces = {}
def _parse_programming_interface(self, interface):
interface = PciProgrammingInterface(
interface, device_subclass=self, device_class=self.device_class
)
interface_id = interface.id
interface_sid = interface.sid
if interface_id not in self.programming_interfaces:
self.programming_interfaces[interface_id] = interface
self.programming_interfaces[interface_sid] = interface
return interface_id
def __str__(self):
return f"{self.name} ({self.device_class.name})"
[docs]
class PciProgrammingInterface:
_regexp = re.compile(r"([0-9a-f]{2})\s+(.*)")
def __init__(self, interface, device_subclass, device_class):
interface = interface.strip()
match = re.match(self._regexp, interface)
if not match:
msg = "Pci device interface could not match regexp: {}."
msg = msg.format(interface)
raise ValueError(msg)
self.device_class = device_class
self.device_subclass = device_subclass
self.sid = match.group(1)
self.id = int(self.sid, 16)
self.name = match.group(2)
def __str__(self):
return self.name
[docs]
class PCIIds:
"""
Class used to parse all pci.ids entries.
This file should contains up to date PCI vendors and device ids.
The default path that is looked is '/usr/share/hwdata/pci.ids'
See http://pciids.sourceforge.net/ to get urls.
It can be updated using the command 'update-pciids',
or by providing a custom source file or url in constructor.
Usage:
from hysop.backend.topology import PCIIds
pciids = PCIIds(filepath or url)
#PCI DEVICES
pciids.vendors[vendor_id]
.devices[device_id]
.subdevices[subdevice_id]
#PCI DEVICE CLASSES
pciids.device_classes[class_id]
.device_subclasses[subclass_id]
.programming_interfaces[interface_id]
All ids can be given as hexadecimal strings or as integers.
"""
def __init__(self, path=None, url=None):
"""
Loads and parse a pci.ids file.
The file may be compressed in gzip or bzip2 format.
url has priority over path.
"""
self.vendors = {}
self.vendor_names = []
self.device_classes = {}
self._parsed = False
path = first_not_None(path, "/usr/share/hwdata/pci.ids")
if url is not None:
self.load_from_url(url=url)
elif path is not None:
try:
self.load_from_path(path=path)
except OSError:
msg = "'{}' has not been found on your system, default url is used instead."
msg = msg.format(path)
warnings.warn(HysopWarning(msg))
self.load_from_url(url="http://pciids.sourceforge.net/v2.2/pci.ids")
self.vendor_names = npw.asarray(self.vendor_names, dtype=str)
[docs]
def load_from_url(self, url):
vprint(f"Fetching file from {url}...")
try:
response = urllib.request.urlopen(url)
content = response.read()
except urllib.error.HTTPError:
msg = "\nFailed to download {}, supply a custom path to provide pci.ids\n"
msg = msg.format(url)
raise RuntimeError(msg)
vprint("Successfully downloaded pci.ids.")
self._parse(content)
[docs]
def load_from_path(self, path):
if not os.path.isfile(path):
msg = f"File '{path}' does not exist."
raise OSError(msg)
if path.endswith(".ids"):
with open(path, "ro") as f:
content = f.read()
elif path.endswith(".ids.gz"):
with gzip.open(path, "ro") as f:
content = f.read()
elif path.endswith(".ids.bz2"):
with bz2.BZ2File(path, "r") as f:
content = f.read()
else:
msg = "File '{}' has an unknown extensions, valid ones are {}."
msg = msg.format(path, ["*.ids", "*.ids.gz", "*.ids.bz2"])
raise ValueError(msg)
self._parse(content)
[docs]
def find_vendor(self, vendor_id):
assert self._parsed
if (vendor_id is None) or (vendor_id not in self.vendors):
return None
else:
return self.vendors[vendor_id]
[docs]
def find_vendor_by_name(self, vendor_name):
assert self._parsed
vendor_name = _fmt_name(vendor_name)
if (vendor_name is None) or (vendor_name not in self.vendors):
return None
else:
return self.vendors[vendor_name]
[docs]
def find_vendor_by_distance_to_name(self, vendor_name):
assert self._parsed
vendor_name = _fmt_name(vendor_name)
idx = npw.argmin(
npw.fromiter(
(editdistance.eval(vn, vendor_name) for vn in self.vendor_names),
dtype=npw.int16,
)
)
vendor_name = self.vendor_names[idx]
if not isinstance(vendor_name, str):
vendor_name = vendor_name[0]
return self.vendors[vendor_name]
[docs]
def find_device(self, vendor_id, device_id, subdevice_id=None):
vendor = self.find_vendor(vendor_id)
if vendor is None:
return None
return vendor.find_device(device_id, subdevice_id)
[docs]
def find_device_by_name(self, vendor_name, device_name, subdevice_name=None):
vendor = self.find_vendor_by_name(vendor_name)
if vendor is None:
return None
return vendor.find_device_by_name(device_name, subdevice_name)
[docs]
def find_device_by_id(self, id):
if isinstance(id, str):
id = int(id, 16)
if id > 0xFFFFFFFFFFFF:
raise ValueError(id)
elif id > 0xFFFFFFFF:
vendor_id = (id & 0xFFFF00000000) >> (2 * 16)
device_id = (id & 0x0000FFFF0000) >> (1 * 16)
subdevice_id = (id & 0x00000000FFFF) >> (0 * 16)
elif id > 0xFFFF:
vendor_id = (id & 0xFFFF0000) >> (1 * 16)
device_id = (id & 0x0000FFFF) >> (0 * 16)
subdevice_id = None
else:
vendor_id = (id & 0xFFFF) >> (0 * 4)
device_id = None
subdevice_id = None
return self.find_device(vendor_id, device_id, subdevice_id)
[docs]
def find_device_class(
self, device_class_id, device_subclass_id=None, programming_interface=None
):
assert self._parsed
if (device_class_id is None) or (device_class_id not in self.device_classes):
return None
device_class = self.device_classes[device_class_id]
if (device_subclass_id is None) or (
device_subclass_id not in device_class.device_subclasses
):
return device_class
device_subclass = device_class.device_subclasses[device_subclass_id]
if (programming_interface is None) or (
programming_interface not in device_subclass.programming_interfaces
):
return device_subclass
programming_interface = device_subclass.programming_interfaces[
programming_interface
]
return programming_interface
[docs]
def find_device_class_by_id(self, id):
if isinstance(id, str):
id = int(id, 16)
if id > 0xFFFFFF:
raise ValueError(id)
elif id > 0xFFFF:
device_class_id = (id & 0xFF0000) >> (2 * 8)
device_subclass_id = (id & 0x00FF00) >> (1 * 8)
programming_interface_id = (id & 0x0000FF) >> (0 * 8)
elif id > 0xFF:
device_class_id = (id & 0xFF00) >> (1 * 8)
device_subclass_id = (id & 0x00FF) >> (0 * 8)
programming_interface_id = None
else:
device_class_id = (id & 0xFF) >> (8 * 0)
device_subclass_id = None
programming_interface_id = None
return self.find_device_class(
device_class_id, device_subclass_id, programming_interface_id
)
def _parse(self, content):
msg = "Something went wrong during parsing."
for line in content.split("\n"):
if (line.strip() == "") or (line[0] == "#"):
continue
elif line.find("\t\t") == 0:
if parse_vendor:
self.vendors[vendor_id].devices[device_id]._parse_sub_device(line)
elif parse_device_class:
self.device_classes[device_class_id].device_subclasses[
device_subclass_id
]._parse_programming_interface(line)
else:
raise RuntimeError(msg)
elif line.find("\t") == 0:
if parse_vendor:
device_id = self.vendors[vendor_id]._parse_device(line)
elif parse_device_class:
device_subclass_id = self.device_classes[
device_class_id
]._parse_sub_class(line)
else:
raise RuntimeError(msg)
elif line.strip()[0] == "C":
device_class_id = self._parse_device_class(line)
parse_device_class = True
parse_vendor = False
else:
vendor_id = self._parse_vendor(line)
parse_device_class = False
parse_vendor = True
self._parsed = True
def _parse_vendor(self, line):
vendor = PciVendor(line)
vendor_id = vendor.id
vendor_sid = vendor.sid
vendor_name = _fmt_name(vendor.name)
if vendor_id not in self.vendors:
self.vendors[vendor_id] = vendor
self.vendors[vendor_sid] = vendor
self.vendors[vendor_name] = vendor
self.vendor_names.append(vendor_name)
return vendor_id
def _parse_device_class(self, line):
device_class = PciDeviceClass(line)
device_class_id = device_class.id
device_class_sid = device_class.sid
device_class_name = _fmt_name(device_class.name)
if device_class_id not in self.device_classes:
self.device_classes[device_class_id] = device_class
self.device_classes[device_class_sid] = device_class
self.device_classes[device_class_name] = device_class
return device_class_id